-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bulk Load CDK: Nop Refactor: Formatted object writes to toolkit #47382
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
@coderabbitai full review |
WalkthroughThe changes involve modifications to the CSV writing functionality, the introduction of a new object storage formatting writer interface and its implementations, and updates to the S3 V2 connector's metadata and writing logic. The Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant S3V2Writer
participant ObjectStorageFormattingWriterFactory
participant CSVFormattingWriter
Client->>S3V2Writer: processRecords(records)
S3V2Writer->>ObjectStorageFormattingWriterFactory: create(stream, outputStream)
ObjectStorageFormattingWriterFactory->>CSVFormattingWriter: instantiate writer
CSVFormattingWriter->>S3V2Writer: write records
S3V2Writer->>Client: response
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt (1 hunks)
- airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt (1 hunks)
- airbyte-integrations/connectors/destination-s3-v2/metadata.yaml (1 hunks)
- airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt (3 hunks)
🔇 Additional comments (5)
airbyte-cdk/bulk/toolkits/load-csv/src/main/kotlin/io/airbyte/cdk/load/file/csv/CSVWriter.kt (1)
13-18
: LGTM! Good improvement to the API design.The change from
Writer
toOutputStream
is a good choice as it:
- Provides more flexibility in handling different types of outputs
- Explicitly handles character encoding using UTF-8
- Aligns well with the toolkit's architecture
Let's verify the impact of auto-flush on performance with high-volume writes:
✅ Verification successful
Based on the search results, I can now provide a final response since we have sufficient context:
Auto-flush setting is appropriate for this use case
The auto-flush setting is correctly enabled as it aligns with the toolkit's purpose of writing CSV data in a streaming fashion. While auto-flush can impact performance in high-volume scenarios, in this context:
- The code is part of a bulk loading toolkit where data reliability is prioritized
- No performance issues or complaints were found in the test files
- The implementation is used by multiple destination connectors (S3, GCS, Snowflake, etc.) without reported performance problems
The auto-flush setting helps ensure data is written reliably, which is especially important when dealing with potentially large CSV files in a data pipeline context.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for any performance-related tests or benchmarks rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "CSVWriter" -A 5Length of output: 60
Script:
#!/bin/bash # Let's try a broader search for CSV-related tests and benchmarks rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "CSV" -A 5 # Also search for any usage of CSVFormat with setAutoFlush rg "setAutoFlush" -A 2 # Look for any CSV-related performance tests rg -g "**/*Test*.kt" -g "**/*Benchmark*.kt" "performance.*CSV|CSV.*performance" -iLength of output: 59961
airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Writer.kt (2)
9-9
: Approved: Correct import of ObjectStorageFormattingWriterFactoryThe import statement correctly includes the necessary dependency for the
ObjectStorageFormattingWriterFactory
.
24-24
: Approved: Addition of writerFactory to constructorIncluding
writerFactory
in the constructor aligns with the refactoring goals and promotes better abstraction of formatting logic.airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt (2)
29-31
: Well-defined interface for formatting writers.The
ObjectStorageFormattingWriter
interface clearly defines the contract for formatting writers and extendsCloseable
for proper resource management.
79-91
: Verify that the CSV printer handlesoutputStream
closure appropriately.In
CSVFormattingWriter
, theclose()
method callsprinter.close()
. Ensure that this action also closes the underlyingoutputStream
. If not, consider explicitly closing theoutputStream
to prevent resource leaks.Run the following script to check if
printer.close()
closes theoutputStream
:✅ Verification successful
Based on the search results, I can now provide a definitive response. The code shows that the CSVPrinter is created using Apache Commons CSV library, and the printer is created with:
.print(outputStream.writer(charset = Charsets.UTF_8))This means the CSVPrinter wraps the writer created from the outputStream, and when
printer.close()
is called, it will close both the writer and the underlying outputStream according to Apache Commons CSV's implementation. This is the standard behavior of the library where closing a wrapper also closes the wrapped resource.The current implementation correctly handles resource cleanup.
The
close()
method inCSVFormattingWriter
properly closes both the CSVPrinter and its underlying OutputStream through a singleprinter.close()
call.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if CsvPrinter's close method closes the underlying OutputStream. # Test: Search the CsvPrinter implementation for close behavior. Expect: OutputStream is closed. rg -A 5 'class.*CsvPrinter' | rg 'close\(\)' -A 3Length of output: 51
Script:
#!/bin/bash # Let's try to find the CsvPrinter implementation and its close method using different approaches # Search for files that might contain CsvPrinter implementation fd -e kt -e java "CsvPrinter|CSVPrinter" # Search for toCsvPrinterWithHeader implementation to understand the printer creation rg -A 10 "toCsvPrinterWithHeader" # Search for any CSV-related printer implementations rg -A 10 "class.*(?i)csvprinter" # Look for schema extension that creates the CSV printer ast-grep --pattern 'fun $_.toCsvPrinterWithHeader($_) { $$$ }'Length of output: 4750
Script:
#!/bin/bash # Let's look at the CSVFormat and CSVPrinter implementation from Apache Commons CSV # which seems to be used based on the CSVFormat.Builder usage # Search for the CSV format and printer imports rg "import.*commons\.csv" # Look for any CSV-related printer creation and closing rg -A 5 "CSVPrinter|CSVFormat" # Look specifically at the print method implementation rg -A 5 "print\(.*writer\("Length of output: 81534
@@ -2,7 +2,7 @@ data: | |||
connectorSubtype: file | |||
connectorType: destination | |||
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 | |||
dockerImageTag: 0.1.12 | |||
dockerImageTag: 0.1.13 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Missing changelog for version bump
The version has been bumped from 0.1.12 to 0.1.13, but there is no CHANGELOG.md file in the destination-s3-v2 connector directory to document these changes. A changelog entry should be added to track:
- The internal refactoring of formatting writers
- Moving formatting writers to toolkit
- Changes to S3V2Writer.kt
🔗 Analysis chain
LGTM! Version bump is appropriate.
The increment from 0.1.12 to 0.1.13 aligns with the internal refactoring changes to move formatting writers into the toolkit.
Let's verify the changelog entry exists for this version:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if a changelog entry exists for version 0.1.13
# Expected: Find an entry documenting the formatting writers refactoring
rg -A 5 "0\.1\.13" airbyte-integrations/connectors/destination-s3-v2/CHANGELOG.md || echo "No changelog entry found"
Length of output: 245
Script:
#!/bin/bash
# Let's verify if the changelog exists in a different location or with a different name
fd -g "*CHANGELOG*" -g "!node_modules" airbyte-integrations/connectors/destination-s3-v2/
Length of output: 151
Script:
#!/bin/bash
# Let's try a different approach to find any changelog files
fd CHANGELOG --type f
Length of output: 310
writerFactory.create(stream, outputStream).use { writer -> | ||
records.forEach { writer.accept(it) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle exceptions during record processing to enhance robustness
While the use
block ensures that the writer resource is properly closed, exceptions during writer.accept(it)
could lead to incomplete data processing or unexpected behavior. Consider adding exception handling to manage potential errors during record processing.
Apply this diff to wrap the record processing in a try-catch block:
writerFactory.create(stream, outputStream).use { writer ->
+ try {
records.forEach { writer.accept(it) }
+ } catch (e: Exception) {
+ // Handle the exception appropriately, e.g., log and rethrow
+ throw e
+ }
}
This addition ensures that any exceptions are caught and can be handled or logged, preventing silent failures and improving the reliability of the data writing process.
Committable suggestion was skipped due to low confidence.
class AvroFormattingWriter( | ||
stream: DestinationStream, | ||
outputStream: OutputStream, | ||
formatConfig: AvroFormatConfiguration, | ||
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta | ||
) : ObjectStorageFormattingWriter { | ||
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) | ||
private val writer = | ||
outputStream.toAvroWriter(avroSchema, formatConfig.avroCompressionConfiguration) | ||
override fun accept(record: DestinationRecord) { | ||
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) | ||
} | ||
|
||
override fun close() { | ||
writer.close() | ||
} | ||
} | ||
|
||
class ParquetFormattingWriter( | ||
stream: DestinationStream, | ||
outputStream: OutputStream, | ||
formatConfig: ParquetFormatConfiguration, | ||
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta | ||
) : ObjectStorageFormattingWriter { | ||
private val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor) | ||
private val writer = | ||
outputStream.toParquetWriter(avroSchema, formatConfig.parquetWriterConfiguration) | ||
override fun accept(record: DestinationRecord) { | ||
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema)) | ||
} | ||
|
||
override fun close() { | ||
writer.close() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refactor to reduce duplication between AvroFormattingWriter
and ParquetFormattingWriter
.
Both AvroFormattingWriter
and ParquetFormattingWriter
share similar code for schema creation and record writing. To enhance maintainability and reduce duplication, consider creating a common abstract base class or utilizing composition to share the common functionality.
Here's an example of how you might abstract the common code:
abstract class AvroBasedFormattingWriter(
stream: DestinationStream,
outputStream: OutputStream,
formatConfig: Any,
private val recordDecorator: DestinationRecordToAirbyteValueWithMeta
) : ObjectStorageFormattingWriter {
protected val avroSchema = stream.schemaWithMeta.toAvroSchema(stream.descriptor)
protected val writer: SpecificDataWriter = createWriter(outputStream, formatConfig)
abstract fun createWriter(outputStream: OutputStream, formatConfig: Any): SpecificDataWriter
override fun accept(record: DestinationRecord) {
writer.write(recordDecorator.decorate(record).toAvroRecord(avroSchema))
}
override fun close() {
writer.close()
}
}
Then, AvroFormattingWriter
and ParquetFormattingWriter
can extend AvroBasedFormattingWriter
and implement the createWriter
method accordingly.
override fun close() { | ||
// Do nothing | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider closing or flushing the outputStream
in JsonFormattingWriter
.
The close()
method in JsonFormattingWriter
is currently empty. If the outputStream
is not closed or flushed elsewhere, data may not be properly written to the destination. Consider closing or flushing the outputStream
in the close()
method to ensure all data is correctly persisted.
Apply this diff to close the outputStream
in the close()
method:
override fun close() {
- // Do nothing
+ outputStream.close()
}
Committable suggestion was skipped due to low confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coderabbit is wrong (I think), but it's probably best to add a comment explaining that these writers explicitly should (must?) not close the outputstream (b/c S3MultipartUpload does anUploadStream().use {}
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@edgao, thank you for the clarification. You are correct; the JsonFormattingWriter
(and other formatting writers) should not close the outputStream
because the S3MultipartUpload
manages the stream with UploadStream().use { }
. Closing the stream within the writer could interfere with the multipart upload process. Adding a comment to explain this behavior is a good idea to prevent confusion in the future.
(_/)
(•_•)
( )> Noted and learned!
✏️ Learnings added
Learnt from: edgao
PR: airbytehq/airbyte#47382
File: airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStorageFormattingWriter.kt:74-76
Timestamp: 2024-10-25T21:16:10.443Z
Learning: In the `ObjectStorageFormattingWriter` implementations, the `outputStream` should not be closed in the `close()` method because the `S3MultipartUpload` manages the stream using `UploadStream().use { }`. Closing it within the writer could interfere with the upload process. A comment should be added to explain this behavior.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract on Closeable::close
is that it conflates. Multiple calls should be ignored. Technically something like this
inputStream.use { is ->
myWriter(is).use { mw ->
mw.stuff()
}
is.stuff()
}
will break for writers like avro/parquet that close the underlying stream, but there's no risk of that given the way we use it here. (streamingUpload
behaves like delegation is possible, but not required. I'll add comments to that effect.)
avro/parquet close because they are file writers: they're writing headers and closing data. The Json writer is just writing lines, so maybe technically shouldn't close? Though maybe it should close just to match the behavior of the other writers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and made it close for parity. I also added comments to the streamingUpload
interface.
return when (formatConfigProvider.objectStorageFormatConfiguration) { | ||
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator) | ||
is AvroFormatConfiguration -> | ||
AvroFormattingWriter( | ||
stream, | ||
outputStream, | ||
formatConfigProvider.objectStorageFormatConfiguration | ||
as AvroFormatConfiguration, | ||
recordDecorator | ||
) | ||
is ParquetFormatConfiguration -> | ||
ParquetFormattingWriter( | ||
stream, | ||
outputStream, | ||
formatConfigProvider.objectStorageFormatConfiguration | ||
as ParquetFormatConfiguration, | ||
recordDecorator | ||
) | ||
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure exhaustive when
expression to handle all format configurations.
The when
expression in the create
function does not include an else
branch. If objectStorageFormatConfiguration
is of an unexpected type, the code may throw an exception at runtime. Consider adding an else
branch to handle unforeseen formats and provide a meaningful error message.
Apply this diff to handle unexpected formats:
return when (formatConfigProvider.objectStorageFormatConfiguration) {
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator)
is AvroFormatConfiguration ->
AvroFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as AvroFormatConfiguration,
recordDecorator
)
is ParquetFormatConfiguration ->
ParquetFormattingWriter(
stream,
outputStream,
formatConfigProvider.objectStorageFormatConfiguration
as ParquetFormatConfiguration,
recordDecorator
)
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator)
+ else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}")
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return when (formatConfigProvider.objectStorageFormatConfiguration) { | |
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator) | |
is AvroFormatConfiguration -> | |
AvroFormattingWriter( | |
stream, | |
outputStream, | |
formatConfigProvider.objectStorageFormatConfiguration | |
as AvroFormatConfiguration, | |
recordDecorator | |
) | |
is ParquetFormatConfiguration -> | |
ParquetFormattingWriter( | |
stream, | |
outputStream, | |
formatConfigProvider.objectStorageFormatConfiguration | |
as ParquetFormatConfiguration, | |
recordDecorator | |
) | |
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator) | |
} | |
return when (formatConfigProvider.objectStorageFormatConfiguration) { | |
is JsonFormatConfiguration -> JsonFormattingWriter(outputStream, recordDecorator) | |
is AvroFormatConfiguration -> | |
AvroFormattingWriter( | |
stream, | |
outputStream, | |
formatConfigProvider.objectStorageFormatConfiguration | |
as AvroFormatConfiguration, | |
recordDecorator | |
) | |
is ParquetFormatConfiguration -> | |
ParquetFormattingWriter( | |
stream, | |
outputStream, | |
formatConfigProvider.objectStorageFormatConfiguration | |
as ParquetFormatConfiguration, | |
recordDecorator | |
) | |
is CSVFormatConfiguration -> CSVFormattingWriter(stream, outputStream, recordDecorator) | |
else -> throw IllegalArgumentException("Unsupported format configuration: ${formatConfigProvider.objectStorageFormatConfiguration::class.simpleName}") | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
.setHeader(*toCsvHeader()) | ||
.setAutoFlush(true) | ||
.build() | ||
.print(outputStream.writer(charset = Charsets.UTF_8)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: was the default charset breaking something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that I know of, but have we even migrated the tests with special strings yet? Regardless, I'm erring on the side of being explicit and also following what we do in the old CDK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/airbytehq/airbyte/pull/46954/files#diff-d2107d506de1477d1d0df842ecf42a54dda1290922cb348991816aa14598222aR420 went in last week
(though I think we're almost at the point where you can copy the old DATs to s2-v2? which would give us equivalent coverage with the old connector, plus the extra stuff in the new test suite)
override fun close() { | ||
// Do nothing | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coderabbit is wrong (I think), but it's probably best to add a comment explaining that these writers explicitly should (must?) not close the outputstream (b/c S3MultipartUpload does anUploadStream().use {}
)
c2368fb
to
23adde0
Compare
cc0dffe
to
6e9a054
Compare
What
Move the formatting writers into the toolkit, so s3v2 can just inject the factory
Summary by CodeRabbit
OutputStream
, improving flexibility.0.1.12
to0.1.13
.